package com.huai.jt1078.endpoint;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import com.huai.jt1078.entity.AudioInfo;
import com.huai.jt1078.utils.ByteUtil;
import com.huai.jt1078.utils.SessionManager;
import com.huai.jt1078.utils.WebsocketUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author xingkong
 * @program jt1078
 * @description 双向对讲websocket
 * @date 2021-09-16 14:29
 **/
@Slf4j
@Component
@ServerEndpoint(value = "/ws/intercom/{tag}")
public class IntercomWebSocket {

    /** 记录当前在线连接数 */
    private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
    public static final Map<String, Session> TAG_MAP = Collections.synchronizedMap(new HashMap<>());
    private static final Map<Session, String> SESSION_MAP = Collections.synchronizedMap(new HashMap<>());
    private static final Map<String, List<byte[]>> MESSAGE_MAP = Collections.synchronizedMap(new HashMap<>());


    @OnOpen
    public synchronized void onOpen(Session session, @PathParam("tag") String tag) {
        if (TAG_MAP.containsKey(tag)){
//            throw new RuntimeException("当前已存在语音对讲，请稍后");
        }
        ONLINE_COUNT.incrementAndGet(); // 在线数加1
        log.info("有新连接加入：{}，当前在线人数为：{}", tag, ONLINE_COUNT.get());
        TAG_MAP.put(tag,session);
        SESSION_MAP.put(session,tag);
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public synchronized void onClose(Session session) {
        String tag = SESSION_MAP.get(session);
        if (StrUtil.isNotBlank(tag)){
            TAG_MAP.remove(tag);
            ONLINE_COUNT.decrementAndGet(); // 在线数减1
        }
        SESSION_MAP.remove(session);
        log.info("有一连接关闭：{}，当前在线人数为：{}", tag, ONLINE_COUNT.get());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message
     *            客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(byte[] message, Session session) throws Exception{
        // 拼接数据
        String tag = SESSION_MAP.get(session);

        // 定义一个数据
        ByteBuf buffer = Unpooled.buffer();
        buffer.writeBytes(message);
        // 读取当前序号
        int index = ByteUtil.readIntDesc(buffer);
        // 读取总序
        int allIndex = ByteUtil.readIntDesc(buffer);
        // 读取总字节数
        int allData = ByteUtil.readIntDesc(buffer);

        List<byte[]> data = MESSAGE_MAP.get(tag);
        if (CollectionUtil.isEmpty(data)){
            data = new ArrayList<>();
        }
        data.add(ByteUtil.readReadableBytes(buffer));
        if (index < allIndex){
            MESSAGE_MAP.put(tag,data);
            return;
        }
        int dataIndex = 0;
        byte[] pcmData = new byte[allData];
        for (byte[] tempData : data) {
            System.arraycopy(tempData, 0, pcmData, dataIndex, tempData.length);
            dataIndex += tempData.length;
        }
        MESSAGE_MAP.remove(tag);



        WebsocketUtil websocketUtil = SessionManager.instance.getWebsocketUtilMap().get(tag);
        if (null == websocketUtil){
            throw new RuntimeException("双向对讲通道已关闭");
        }
        String[] split = tag.split("-");

        if (pcmData.length > 320){
            int squeue = 0;
            int time = 0;
            int len = -1;
            byte[] block = new byte[320];
            // 字节数组转字节流
            InputStream fis = new ByteArrayInputStream(pcmData);
            while ((len = fis.read(block)) > -1) {
                AudioInfo audioInfo = AudioInfo.builder()
                        .simNo(split[0]).channelNo(Integer.parseInt(split[1]))
                        .data(websocketUtil.getAudioCodec().fromPCM(block)).loadType(6).sequence(squeue).timeStamp(time).build();
                squeue++;
                log.info("len :{}  squeue:{} time:{}",len,squeue,time);
                time += 20;
                websocketUtil.getContext().writeAndFlush(audioInfo);
                Thread.sleep(20);
            }

        }else{
            AudioInfo audioInfo = AudioInfo.builder()
                    .simNo(split[0]).channelNo(Integer.parseInt(split[1]))
                    .data(websocketUtil.getAudioCodec().fromPCM(pcmData)).loadType(6).sequence(websocketUtil.getSequence()).timeStamp(websocketUtil.getTime()).build();
            websocketUtil.getContext().writeAndFlush(audioInfo);
        }
        this.sendMessage("ok, 我收到了", session,tag);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误:"+error.getMessage());
        error.printStackTrace();
    }

    /**
     * 服务端发送消息给客户端
     */
    private void sendMessage(String message, Session toSession,String tag) {
        try {
            toSession.getBasicRemote().sendText(message);
        } catch (Exception e) {
            log.error("服务端发送消息给客户端失败：", e);
        }
    }

}
